/**
 * FileName: AutoProcessChain
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/21 18:20
 * Description:
 */
package cn.com.bonc.process.chain;

import cn.com.bonc.process.Process;
import cn.com.bonc.process.impl.*;
import cn.com.bonc.util.ProcessCfgUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.immutable.Map;

public class AutoProcessChain {

    private ProcessChain processChain;

    private AutoProcessChain(ProcessChain processChain) {
        this.processChain = processChain;
    }

    /**
     * 设置数据源
     * @param rowDataset
     * @return
     */
    public static AutoProcessChain setSourceData(Dataset<Row> rowDataset){
        return new AutoProcessChain(ProcessChain.setSourceData(rowDataset));
    }

    /**
     * 根据--files文件不同，自动添加 Process ，并形成链结构
     * @param sparkSession
     * @return
     */
    public AutoProcessChain autoAddProcess(SparkSession sparkSession){
        Map<String, String> all = sparkSession.conf().getAll();
        Iterator<Tuple2<String, String>> iterator = all.iterator();
        while (iterator.hasNext()){
            Tuple2<String, String> next = iterator.next();
            if (next._1.equals("spark.files")){
                if (next._2.contains("rule-action.json")){
                    processChain.addProcess(new RuleActionProcessImpl());
                }
                if (next._2.contains("cols-map.properties")){
                    processChain.addProcess(new Col2MutiColsProcessImpl());
                    if (next._2.contains("process.properties")){
                        if(ProcessCfgUtil.isJoin()){
                            processChain.addProcess(new JoinExternalDataProcessImpl(sparkSession));
                        }
                        if (ProcessCfgUtil.getSqlList().size()>0){
                            processChain.addProcess(new SqlListProcessImpl(sparkSession));
                        }
                        if(ProcessCfgUtil.isWindowAggregate()){
                            processChain.addProcess(new WindowCountProcessImpl());
                        }
                    }
                    processChain.addProcess(new MutiCols2ColProcessImpl());
                }
                break;
            }
        }
        return this;
    }

    /**
     * 手动添加新的Process
     * @param process
     * @return
     */
    public AutoProcessChain addProcess(Process process){
        if (processChain!=null){
            processChain.addProcess(process);
        }
       return this;
    }

    /**
     * 手动执行
     * @return
     */
    public  Dataset<Row> execute(){
        if (processChain!=null){
            return processChain.execute();
        }
       return null;
    }

    /**
     * 自动添加Process并执行得到处理后的Dataset
     * @param sparkSession
     * @return
     */
    public Dataset<Row> AutoExecute(SparkSession sparkSession){
        return autoAddProcess(sparkSession).execute();
    }
}
